Skip to main content
Version: 0.11.4

AutoML

FindBestModel

from synapse.ml.automl import *
from synapse.ml.train import *
from pyspark.ml.classification import RandomForestClassifier

df = (spark.createDataFrame([
(0, 2, 0.50, 0.60, 0),
(1, 3, 0.40, 0.50, 1),
(0, 4, 0.78, 0.99, 2),
(1, 5, 0.12, 0.34, 3),
(0, 1, 0.50, 0.60, 0),
(1, 3, 0.40, 0.50, 1),
(0, 3, 0.78, 0.99, 2),
(1, 4, 0.12, 0.34, 3),
(0, 0, 0.50, 0.60, 0),
(1, 2, 0.40, 0.50, 1),
(0, 3, 0.78, 0.99, 2),
(1, 4, 0.12, 0.34, 3)
], ["Label", "col1", "col2", "col3", "col4"]))

# mocking models
randomForestClassifier = (TrainClassifier()
.setModel(RandomForestClassifier()
.setMaxBins(32)
.setMaxDepth(5)
.setMinInfoGain(0.0)
.setMinInstancesPerNode(1)
.setNumTrees(20)
.setSubsamplingRate(1.0)
.setSeed(0))
.setFeaturesCol("mlfeatures")
.setLabelCol("Label"))
model = randomForestClassifier.fit(df)

findBestModel = (FindBestModel()
.setModels([model, model])
.setEvaluationMetric("accuracy"))
bestModel = findBestModel.fit(df)
bestModel.transform(df).show()
Python API: FindBestModelScala API: FindBestModelSource: FindBestModel

TuneHyperparameters

from synapse.ml.automl import *
from synapse.ml.train import *
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier


df = (spark.createDataFrame([
(0, 1, 1, 1, 1, 1, 1.0, 3, 1, 1),
(0, 1, 1, 1, 1, 2, 1.0, 1, 1, 1),
(0, 1, 1, 1, 1, 2, 1.0, 2, 1, 1),
(0, 1, 2, 3, 1, 2, 1.0, 3, 1, 1),
(0, 3, 1, 1, 1, 2, 1.0, 3, 1, 1)
], ["Label", "Clump_Thickness", "Uniformity_of_Cell_Size",
"Uniformity_of_Cell_Shape", "Marginal_Adhesion", "Single_Epithelial_Cell_Size",
"Bare_Nuclei", "Bland_Chromatin", "Normal_Nucleoli", "Mitoses"]))

logReg = LogisticRegression()
randForest = RandomForestClassifier()
gbt = GBTClassifier()
smlmodels = [logReg, randForest, gbt]
mmlmodels = [TrainClassifier(model=model, labelCol="Label") for model in smlmodels]

paramBuilder = (HyperparamBuilder()
.addHyperparam(logReg, logReg.regParam, RangeHyperParam(0.1, 0.3))
.addHyperparam(randForest, randForest.numTrees, DiscreteHyperParam([5,10]))
.addHyperparam(randForest, randForest.maxDepth, DiscreteHyperParam([3,5]))
.addHyperparam(gbt, gbt.maxBins, RangeHyperParam(8,16))
.addHyperparam(gbt, gbt.maxDepth, DiscreteHyperParam([3,5])))
searchSpace = paramBuilder.build()
# The search space is a list of params to tuples of estimator and hyperparam
randomSpace = RandomSpace(searchSpace)

bestModel = TuneHyperparameters(
evaluationMetric="accuracy", models=mmlmodels, numFolds=2,
numRuns=len(mmlmodels) * 2, parallelism=2,
paramSpace=randomSpace.space(), seed=0).fit(df)
Python API: TuneHyperparametersScala API: TuneHyperparametersSource: TuneHyperparameters

Featurize

CleanMissingData

from synapse.ml.featurize import *

dataset = spark.createDataFrame([
(0, 2, 0.50, 0.60, 0),
(1, 3, 0.40, None, None),
(0, 4, 0.78, 0.99, 2),
(1, 5, 0.12, 0.34, 3),
(0, 1, 0.50, 0.60, 0),
(None, None, None, None, None),
(0, 3, 0.78, 0.99, 2),
(1, 4, 0.12, 0.34, 3),
(0, None, 0.50, 0.60, 0),
(1, 2, 0.40, 0.50, None),
(0, 3, None, 0.99, 2),
(1, 4, 0.12, 0.34, 3)
], ["col1", "col2", "col3", "col4", "col5"])

cmd = (CleanMissingData()
.setInputCols(dataset.columns)
.setOutputCols(dataset.columns)
.setCleaningMode("Mean"))
Python API: CleanMissingDataScala API: CleanMissingDataSource: CleanMissingData

CountSelector

from synapse.ml.featurize import *
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
(Vectors.sparse(3, [(0, 1.0), (2, 2.0)]), Vectors.dense(1.0, 0.1, 0)),
(Vectors.sparse(3, [(0, 1.0), (2, 2.0)]), Vectors.dense(1.0, 0.1, 0))
], ["col1", "col2"])

cs = CountSelector().setInputCol("col1").setOutputCol("col3")

cs.fit(df).transform(df).show()
Python API: CountSelectorScala API: CountSelectorSource: CountSelector

Featurize

from synapse.ml.featurize import *

dataset = spark.createDataFrame([
(0, 2, 0.50, 0.60, "pokemon are everywhere"),
(1, 3, 0.40, 0.50, "they are in the woods"),
(0, 4, 0.78, 0.99, "they are in the water"),
(1, 5, 0.12, 0.34, "they are in the fields"),
(0, 3, 0.78, 0.99, "pokemon - gotta catch em all")
], ["Label", "col1", "col2", "col3"])

feat = (Featurize()
.setNumFeatures(10)
.setOutputCol("testColumn")
.setInputCols(["col1", "col2", "col3"])
.setOneHotEncodeCategoricals(False))

feat.fit(dataset).transform(dataset).show()
Python API: FeaturizeScala API: FeaturizeSource: Featurize

ValueIndexer

from synapse.ml.featurize import *

df = spark.createDataFrame([
(-3, 24, 0.32534, True, "piano"),
(1, 5, 5.67, False, "piano"),
(-3, 5, 0.32534, False, "guitar")
], ["int", "long", "double", "bool", "string"])

vi = ValueIndexer().setInputCol("string").setOutputCol("string_cat")

vi.fit(df).transform(df).show()
Python API: ValueIndexerScala API: ValueIndexerSource: ValueIndexer

Featurize Text

TextFeaturizer

from synapse.ml.featurize.text import *

dfRaw = spark.createDataFrame([
(0, "Hi I"),
(1, "I wish for snow today"),
(2, "we Cant go to the park, because of the snow!"),
(3, "")
], ["label", "sentence"])

tfRaw = (TextFeaturizer()
.setInputCol("sentence")
.setOutputCol("features")
.setNumFeatures(20))

tfRaw.fit(dfRaw).transform(dfRaw).show()
Python API: TextFeaturizerScala API: TextFeaturizerSource: TextFeaturizer

Isolation Forest

IsolationForest

from synapse.ml.isolationforest import *

isolationForest = (IsolationForest()
.setNumEstimators(100)
.setBootstrap(False)
.setMaxSamples(256)
.setMaxFeatures(1.0)
.setFeaturesCol("features")
.setPredictionCol("predictedLabel")
.setScoreCol("outlierScore")
.setContamination(0.02)
.setContaminationError(0.02 * 0.01)
.setRandomSeed(1))
Python API: IsolationForestScala API: IsolationForestSource: IsolationForest

NN

ConditionalKNN

from synapse.ml.nn import *

cknn = (ConditionalKNN()
.setOutputCol("matches")
.setFeaturesCol("features"))
Python API: ConditionalKNNScala API: ConditionalKNNSource: ConditionalKNN

KNN

from synapse.ml.nn import *

knn = (KNN()
.setOutputCol("matches"))
Python API: KNNScala API: KNNSource: KNN

Recommendation

RecommendationIndexer, RankingEvaluator, RankingAdapter and RankingTrainValidationSplit

from synapse.ml.recommendation import *
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import *

ratings = (spark.createDataFrame([
("11", "Movie 01", 2),
("11", "Movie 03", 1),
("11", "Movie 04", 5),
("11", "Movie 05", 3),
("11", "Movie 06", 4),
("11", "Movie 07", 1),
("11", "Movie 08", 5),
("11", "Movie 09", 3),
("22", "Movie 01", 4),
("22", "Movie 02", 5),
("22", "Movie 03", 1),
("22", "Movie 05", 3),
("22", "Movie 06", 3),
("22", "Movie 07", 5),
("22", "Movie 08", 1),
("22", "Movie 10", 3),
("33", "Movie 01", 4),
("33", "Movie 03", 1),
("33", "Movie 04", 5),
("33", "Movie 05", 3),
("33", "Movie 06", 4),
("33", "Movie 08", 1),
("33", "Movie 09", 5),
("33", "Movie 10", 3),
("44", "Movie 01", 4),
("44", "Movie 02", 5),
("44", "Movie 03", 1),
("44", "Movie 05", 3),
("44", "Movie 06", 4),
("44", "Movie 07", 5),
("44", "Movie 08", 1),
("44", "Movie 10", 3)
], ["customerIDOrg", "itemIDOrg", "rating"])
.dropDuplicates()
.cache())

recommendationIndexer = (RecommendationIndexer()
.setUserInputCol("customerIDOrg")
.setUserOutputCol("customerID")
.setItemInputCol("itemIDOrg")
.setItemOutputCol("itemID")
.setRatingCol("rating"))

transformedDf = (recommendationIndexer.fit(ratings)
.transform(ratings).cache())

als = (ALS()
.setNumUserBlocks(1)
.setNumItemBlocks(1)
.setUserCol("customerID")
.setItemCol("itemID")
.setRatingCol("rating")
.setSeed(0))

evaluator = (RankingEvaluator()
.setK(3)
.setNItems(10))

adapter = (RankingAdapter()
.setK(evaluator.getK())
.setRecommender(als))

adapter.fit(transformedDf).transform(transformedDf).show()

paramGrid = (ParamGridBuilder()
.addGrid(als.regParam, [1.0])
.build())

tvRecommendationSplit = (RankingTrainValidationSplit()
.setEstimator(als)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.8)
.setUserCol(recommendationIndexer.getUserOutputCol())
.setItemCol(recommendationIndexer.getItemOutputCol())
.setRatingCol("rating"))

tvRecommendationSplit.fit(transformedDf).transform(transformedDf).show()
Python API: RecommendationIndexerScala API: RecommendationIndexerSource: RecommendationIndexer
Python API: RankingEvaluatorScala API: RankingEvaluatorSource: RankingEvaluator
Python API: RankingAdapterScala API: RankingAdapterSource: RankingAdapter
Python API: RankingTrainValidationSplitScala API: RankingTrainValidationSplitSource: RankingTrainValidationSplit

SAR

from synapse.ml.recommendation import *

ratings = (spark.createDataFrame([
("11", "Movie 01", 2),
("11", "Movie 03", 1),
("11", "Movie 04", 5),
("11", "Movie 05", 3),
("11", "Movie 06", 4),
("11", "Movie 07", 1),
("11", "Movie 08", 5),
("11", "Movie 09", 3),
("22", "Movie 01", 4),
("22", "Movie 02", 5),
("22", "Movie 03", 1),
("22", "Movie 05", 3),
("22", "Movie 06", 3),
("22", "Movie 07", 5),
("22", "Movie 08", 1),
("22", "Movie 10", 3),
("33", "Movie 01", 4),
("33", "Movie 03", 1),
("33", "Movie 04", 5),
("33", "Movie 05", 3),
("33", "Movie 06", 4),
("33", "Movie 08", 1),
("33", "Movie 09", 5),
("33", "Movie 10", 3),
("44", "Movie 01", 4),
("44", "Movie 02", 5),
("44", "Movie 03", 1),
("44", "Movie 05", 3),
("44", "Movie 06", 4),
("44", "Movie 07", 5),
("44", "Movie 08", 1),
("44", "Movie 10", 3)
], ["customerIDOrg", "itemIDOrg", "rating"])
.dropDuplicates()
.cache())

recommendationIndexer = (RecommendationIndexer()
.setUserInputCol("customerIDOrg")
.setUserOutputCol("customerID")
.setItemInputCol("itemIDOrg")
.setItemOutputCol("itemID")
.setRatingCol("rating"))

algo = (SAR()
.setUserCol("customerID")
.setItemCol("itemID")
.setRatingCol("rating")
.setTimeCol("timestamp")
.setSupportThreshold(1)
.setSimilarityFunction("jacccard")
.setActivityTimeFormat("EEE MMM dd HH:mm:ss Z yyyy"))

adapter = (RankingAdapter()
.setK(5)
.setRecommender(algo))

res1 = recommendationIndexer.fit(ratings).transform(ratings).cache()

adapter.fit(res1).transform(res1).show()
Python API: SARScala API: SARSource: SAR

Stages

ClassBalancer

from synapse.ml.stages import *

df = (spark.createDataFrame([
(0, 1.0, "Hi I"),
(1, 1.0, "I wish for snow today"),
(2, 2.0, "I wish for snow today"),
(3, 2.0, "I wish for snow today"),
(4, 2.0, "I wish for snow today"),
(5, 2.0, "I wish for snow today"),
(6, 0.0, "I wish for snow today"),
(7, 1.0, "I wish for snow today"),
(8, 0.0, "we Cant go to the park, because of the snow!"),
(9, 2.0, "")
], ["index", "label", "sentence"]))

cb = ClassBalancer().setInputCol("label")

cb.fit(df).transform(df).show()
Python API: ClassBalancerScala API: ClassBalancerSource: ClassBalancer

MultiColumnAdapter

from synapse.ml.stages import *
from pyspark.ml.feature import Tokenizer

df = (spark.createDataFrame([
(0, "This is a test", "this is one too"),
(1, "could be a test", "bar"),
(2, "foo", "bar"),
(3, "foo", "maybe not")
], ["label", "words1", "words2"]))

stage1 = Tokenizer()
mca = (MultiColumnAdapter()
.setBaseStage(stage1)
.setInputCols(["words1", "words2"])
.setOutputCols(["output1", "output2"]))

mca.fit(df).transform(df).show()
Python API: MultiColumnAdapterScala API: MultiColumnAdapterSource: MultiColumnAdapter

Timer

from synapse.ml.stages import *
from pyspark.ml.feature import *

df = (spark.createDataFrame([
(0, "Hi I"),
(1, "I wish for snow today"),
(2, "we Cant go to the park, because of the snow!"),
(3, "")
], ["label", "sentence"]))

tok = (Tokenizer()
.setInputCol("sentence")
.setOutputCol("tokens"))

df2 = Timer().setStage(tok).fit(df).transform(df)

df3 = HashingTF().setInputCol("tokens").setOutputCol("hash").transform(df2)

idf = IDF().setInputCol("hash").setOutputCol("idf")
timer = Timer().setStage(idf)

timer.fit(df3).transform(df3).show()
Python API: TimerScala API: TimerSource: Timer

Train

TrainClassifier

from synapse.ml.train import *
from pyspark.ml.classification import LogisticRegression

df = spark.createDataFrame([
(0, 2, 0.50, 0.60, 0),
(1, 3, 0.40, 0.50, 1),
(0, 4, 0.78, 0.99, 2),
(1, 5, 0.12, 0.34, 3),
(0, 1, 0.50, 0.60, 0),
(1, 3, 0.40, 0.50, 1),
(0, 3, 0.78, 0.99, 2),
(1, 4, 0.12, 0.34, 3),
(0, 0, 0.50, 0.60, 0),
(1, 2, 0.40, 0.50, 1),
(0, 3, 0.78, 0.99, 2),
(1, 4, 0.12, 0.34, 3)],
["Label", "col1", "col2", "col3", "col4"]
)

tc = (TrainClassifier()
.setModel(LogisticRegression())
.setLabelCol("Label"))

tc.fit(df).transform(df).show()
Python API: TrainClassifierScala API: TrainClassifierSource: TrainClassifier

TrainRegressor

from synapse.ml.train import *
from pyspark.ml.regression import LinearRegression

dataset = (spark.createDataFrame([
(0.0, 2, 0.50, 0.60, 0.0),
(1.0, 3, 0.40, 0.50, 1.0),
(2.0, 4, 0.78, 0.99, 2.0),
(3.0, 5, 0.12, 0.34, 3.0),
(0.0, 1, 0.50, 0.60, 0.0),
(1.0, 3, 0.40, 0.50, 1.0),
(2.0, 3, 0.78, 0.99, 2.0),
(3.0, 4, 0.12, 0.34, 3.0),
(0.0, 0, 0.50, 0.60, 0.0),
(1.0, 2, 0.40, 0.50, 1.0),
(2.0, 3, 0.78, 0.99, 2.0),
(3.0, 4, 0.12, 0.34, 3.0)],
["label", "col1", "col2", "col3", "col4"]))

linearRegressor = (LinearRegression()
.setRegParam(0.3)
.setElasticNetParam(0.8))
trainRegressor = (TrainRegressor()
.setModel(linearRegressor)
.setLabelCol("label"))

trainRegressor.fit(dataset).transform(dataset).show()
Python API: TrainRegressorScala API: TrainRegressorSource: TrainRegressor